On-Time Flight Performance with Spark and Cosmos DB (Las Vegas)

On-Time Flight Performance Background

This notebook provides an analysis of On-Time Flight Performance and Departure Delays data using GraphFrames for Apache Spark.

Source Data:

References:

Spark to Cosmos DB Connector

Connecting Apache Spark to Azure Cosmos DB accelerates your ability to solve your fast moving Data Sciences problems where your data can be quickly persisted and retrieved using Azure Cosmos DB's DocumentDB API. With the Spark to Cosmos DB conector, you can more easily solve scenarios including (but not limited to) blazing fast IoT scenarios, update-able columns when performing analytics, push-down predicate filtering, and performing advanced analytics to data sciences against your fast changing data against a geo-replicated managed document store with guaranteed SLAs for consistency, availability, low latency, and throughput.

The Spark to Cosmos DB connector utilizes the Azure DocumentDB Java SDK will utilize the following flow:

The data flow is as follows:

  1. Connection is made from Spark master node to Cosmos DB gateway node to obtain the partition map. Note, user only specifies Spark and Cosmos DB connections, the fact that it connects to the respective master and gateway nodes is transparent to the user.
  2. This information is provided back to the Spark master node. At this point, we should be able to parse the query to determine which partitions (and their locations) within Cosmos DB we need to access.
  3. This information is transmitted to the Spark worker nodes ...
  4. Thus allowing the Spark worker nodes to connect directly to the Cosmos DB partitions directly to extract the data that is needed and bring the data back to the Spark partitions within the Spark worker nodes.
In [1]:
%%configure
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "executorMemory": "8G", 
  "executorCores": 2, 
  "numExecutors": 2, 
  "driverCores": 2,
  "jars": ["wasb:///example/jars/0.0.3c/azure-documentdb-1.10.0.jar","wasb:///example/jars/0.0.3c/azure-cosmosdb-spark-0.0.3-SNAPSHOT.jar"],
  "conf": {
    "spark.jars.packages": "graphframes:graphframes:0.5.0-spark2.1-s_2.11",   
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}
Current session configs: {u'kind': 'pyspark', u'name': u'Spark-to-Cosmos_DB_Connector', u'numExecutors': 2, u'conf': {u'spark.jars.packages': u'graphframes:graphframes:0.5.0-spark2.1-s_2.11', u'spark.jars.excludes': u'org.scala-lang:scala-reflect'}, u'executorCores': 2, u'driverCores': 2, u'jars': [u'wasb:///example/jars/0.0.3c/azure-documentdb-1.10.0.jar', u'wasb:///example/jars/0.0.3c/azure-cosmosdb-spark-0.0.3-SNAPSHOT.jar'], u'executorMemory': u'8G'}
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
38application_1498709437567_0013sparkidleLinkLink
41application_1498709437567_0016sparkidleLinkLink
42application_1498709437567_0017sparkidleLinkLink
In [2]:
# Connection
flightsConfig = {
"Endpoint" : "https://doctorwho.documents.azure.com:443/",
"Masterkey" : "xWpfqUBioucC2YkWV6uHVhgZtsPIjIVmE4VDPyNYnw2QUazvCHm3rnn9AeSgglLOT3yfjCR5YbLeh5MCc3aKNw==",
"Database" : "DepartureDelays",
"preferredRegions" : "Central US",
"Collection" : "flights_pcoll", 
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c"
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
46application_1498709437567_0021pysparkidleLinkLink✔
SparkSession available as 'spark'.
In [3]:
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**flightsConfig).load()
flights.count()
flights.cache()
DataFrame[origin: string, distance: int, date: int, delay: int, destination: string]
In [4]:
flights.createOrReplaceTempView("flights")

Obtaining airport code information

In [5]:
# Set File Paths
airportsnaFilePath = "wasb://data@doctorwhostore.blob.core.windows.net/airport-codes-na.txt"

# Obtain airports dataset
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='\t')
airportsna.createOrReplaceTempView("airports")

Flights departing from Las Vegas

In [6]:
%%sql
select count(1) from flights where origin = 'LAS'
count(1)
0 33107

Top 10 Delayed Destinations originating from Las Vegas

In [7]:
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY TotalDelays DESC)-1), '. '), destination) as destination, TotalDelays
from (
select a.city as destination, sum(f.delay) as TotalDelays, count(1) as Trips
from flights f
join airports a
  on a.IATA = f.destination
where f.origin = 'LAS'
and f.delay > 0
group by a.city 
order by sum(delay) desc limit 10
) a

Calculate median delays by destination cities departing from Las Vegas

In [8]:
%%sql
select a.city as destination, percentile_approx(f.delay, 0.5) as median_delay
from flights f
join airports a
  on a.IATA = f.destination
where f.origin = 'LAS'
group by a.city 
order by percentile_approx(f.delay, 0.5)

Building up a GraphFrames

Using GraphFrames for Apache Spark to run degree and motif queries against Cosmos DB

In [9]:
# Build `departureDelays` DataFrame
departureDelays = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from flights f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# Create Temporary View and cache
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()
DataFrame[tripid: int, localdate: timestamp, delay: int, distance: int, src: string, dst: string, city_src: string, city_dst: string, state_src: string, state_dst: string]
In [12]:
# Note, ensure you have already installed the GraphFrames spack-package
import os
sc.addPyFile(os.path.expanduser('./graphframes_graphframes-0.5.0-spark2.1-s_2.11.jar'))
from pyspark.sql.functions import *
from graphframes import *

# Create Vertices (airports) and Edges (flights)
tripVertices = airportsna.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

# Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

# Create TripGraph
tripGraph = GraphFrame(tripVertices, tripEdges)

What flights departing LAS with the most significant average delays

Note, the joins are there to see the city name instead of the IATA codes. The rank() code is there to help order the data correctly when viewed in Jupyter notebooks.

In [13]:
flightDelays = tripGraph.edges.filter("src = 'LAS' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)"))
flightDelays.createOrReplaceTempView("flightDelays")
In [21]:
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY avg_delay DESC)-1), '. '), city) as destination, 
avg_delay
from (
select a.city, `avg(delay)` as avg_delay 
from flightDelays f
join airports a
on f.dst = a.iata
order by `avg(delay)` 
desc limit 10
) s

Which is the most important airport (in terms of connections)

It would take a relatively complicated SQL statement to calculate all of the edges to a single vertex, grouped by the vertices. Instead, we can use the graph degree method.

In [22]:
airportConnections = tripGraph.degrees.sort(desc("degree"))
airportConnections.createOrReplaceTempView("airportConnections")
In [25]:
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY degree DESC)-1), '. '), city) as destination, 
degree
from (
select a.city, f.degree 
from airportConnections f 
join airports a
  on a.iata = f.id
order by f.degree desc 
limit 10
) a

Are there direct flights between Seattle and San Jose?

In [29]:
filteredPaths = tripGraph.bfs(
    fromExpr = "id = 'SEA'",
    toExpr = "id = 'SJC'",
    maxPathLength = 1)
filteredPaths.show()
+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[Seattle,WA,USA,SEA]|[1010600,-2,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1012030,-4,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1011215,-6,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1011855,-3,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1010710,-1,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1020600,2,SEA,SJ...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1022030,-3,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1021600,-2,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1021215,-9,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1021855,-1,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1020710,-9,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1030600,-5,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1032030,-1,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1031600,-7,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1031215,-3,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1031855,-1,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1030710,-5,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1040600,4,SEA,SJ...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1042030,-2,SEA,S...|[San Jose,CA,USA,...|
|[Seattle,WA,USA,SEA]|[1041215,-4,SEA,S...|[San Jose,CA,USA,...|
+--------------------+--------------------+--------------------+
only showing top 20 rows

But are there any direct flights between San Jose and Buffalo?

  • Try maxPathLength = 1 which means one edge (i.e. one flight) between SJC and BUF, i.e. direct flight
  • Try maxPathLength = 2 which means two edges between SJC and BUF, i.e. all the different variations of flights between San Jose and Buffalo with only one stop oever in between?
In [33]:
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SJC'",
  toExpr = "id = 'BUF'",
  maxPathLength = 1)
filteredPaths.show()
+----+-----+-------+---+
|City|State|Country| id|
+----+-----+-------+---+
+----+-----+-------+---+
In [34]:
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SJC'",
  toExpr = "id = 'BUF'",
  maxPathLength = 2)
filteredPaths.show()
+--------------------+--------------------+-------------------+--------------------+--------------------+
|                from|                  e0|                 v1|                  e1|                  to|
+--------------------+--------------------+-------------------+--------------------+--------------------+
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1010635,-6,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1011059,13,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1011427,19,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1020635,-4,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1021059,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1021427,194,BOS,...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1030635,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1031059,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1031427,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1040635,16,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1041552,96,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1050635,1,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1051059,48,BOS,B...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1051427,443,BOS,...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1060635,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1061059,294,BOS,...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1061427,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1070730,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1071730,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
|[San Jose,CA,USA,...|[1012124,16,SJC,B...|[Boston,MA,USA,BOS]|[1080710,0,BOS,BU...|[Buffalo,NY,USA,BUF]|
+--------------------+--------------------+-------------------+--------------------+--------------------+
only showing top 20 rows

In that case, what is the most common transfer point between San Jose and Buffalo?

In [35]:
commonTransferPoint = filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count"))
commonTransferPoint.createOrReplaceTempView("commonTransferPoint")
In [38]:
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY Trips DESC)-1), '. '), city) as destination, 
Trips
degree
from (
select City, `count` as Trips from commonTransferPoint order by Trips desc limit 10
) a
In [ ]: